/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.seata.server.console.impl.file;

import org.apache.seata.common.result.PageResult;
import org.apache.seata.common.result.SingleResult;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.lock.RowLock;
import org.apache.seata.server.console.entity.bo.GlobalLockQueryBO;
import org.apache.seata.server.console.entity.param.GlobalLockParam;
import org.apache.seata.server.console.entity.vo.GlobalLockVO;
import org.apache.seata.server.console.exception.ConsoleException;
import org.apache.seata.server.console.impl.AbstractLockService;
import org.apache.seata.server.console.impl.redis.GlobalLockRedisServiceImpl;
import org.apache.seata.server.console.service.GlobalLockService;
import org.apache.seata.server.lock.LockerManagerFactory;
import org.apache.seata.server.session.BranchSession;
import org.apache.seata.server.session.GlobalSession;
import org.apache.seata.server.session.SessionHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Objects.isNull;
import static org.apache.seata.common.util.StringUtils.isBlank;
import static org.apache.seata.server.console.entity.vo.GlobalLockVO.convert;

/**
 * Global Lock File ServiceImpl
 *
 */
@Component
@org.springframework.context.annotation.Configuration
@ConditionalOnExpression("#{'file'.equals('${lockMode}')}")
public class GlobalLockFileServiceImpl extends AbstractLockService implements GlobalLockService {
    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalLockRedisServiceImpl.class);

    @Override
    public PageResult<GlobalLockVO> query(GlobalLockParam param) {
        checkParam(param);

        final Collection<GlobalSession> allSessions =
                SessionHolder.getRootSessionManager().allSessions();

        List<GlobalLockQueryBO> result = allSessions.parallelStream()
                .filter(obtainGlobalSessionPredicate(param))
                .flatMap(globalSession -> globalSession.getBranchSessions().stream()
                        .filter(obtainBranchSessionPredicate(param))
                        .flatMap(branchSession -> filterAndMap(param, branchSession)
                                .map(lock -> new GlobalLockQueryBO(lock, globalSession))))
                .collect(Collectors.toList());

        return PageResult.build(convert(result), param.getPageNum(), param.getPageSize());
    }

    @Override
    public SingleResult<Void> deleteLock(GlobalLockParam param) {
        checkDeleteLock(param);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("start to delete global lock,xid:{} branchId:{}", param.getXid(), param.getBranchId());
        }
        GlobalSession globalSession = SessionHolder.getRootSessionManager().findGlobalSession(param.getXid(), true);
        if (globalSession != null) {
            List<BranchSession> branchSessions = globalSession.getBranchSessions().stream()
                    .filter(branchSession -> branchSession.getBranchId() == Long.parseLong(param.getBranchId()))
                    .collect(Collectors.toList());

            if (branchSessions.size() != 1) {
                throw new ConsoleException(
                        new UnsupportedOperationException("branch session size is not one"),
                        String.format(
                                "delete global lock," + "xid:%s ,branchId:%s", param.getXid(), param.getBranchId()));
            }
            try {
                lockManager.releaseLock(branchSessions.get(0));
            } catch (TransactionException e) {
                throw new ConsoleException(
                        e,
                        String.format(
                                "delete global lock," + "xid:%s ,branchId:%s", param.getXid(), param.getBranchId()));
            }
        }
        return SingleResult.success();
    }

    /**
     * filter with tableName and generate RowLock
     *
     * @param param the query param
     * @param branchSession the branch session
     * @return the RowLock list
     */
    private Stream<RowLock> filterAndMap(GlobalLockParam param, BranchSession branchSession) {
        if (CollectionUtils.isEmpty(branchSession.getLockHolder())) {
            return Stream.empty();
        }

        final String tableName = param.getTableName();

        // get rowLock from branchSession
        final List<RowLock> rowLocks = LockerManagerFactory.getLockManager().collectRowLocks(branchSession);

        if (StringUtils.isNotBlank(tableName)) {
            return rowLocks.parallelStream()
                    .filter(rowLock -> rowLock.getTableName().contains(param.getTableName()));
        }

        return rowLocks.stream();
    }

    /**
     * check the param
     *
     * @param param the param
     */
    private void checkParam(GlobalLockParam param) {
        if (param.getPageSize() <= 0 || param.getPageNum() <= 0) {
            throw new IllegalArgumentException("wrong pageSize or pageNum");
        }

        // verification data type
        try {
            Long.parseLong(param.getTransactionId());
        } catch (NumberFormatException e) {
            param.setTransactionId(null);
        }
        try {
            Long.parseLong(param.getBranchId());
        } catch (NumberFormatException e) {
            param.setBranchId(null);
        }
    }

    /**
     * obtain the branch session condition
     *
     * @param param condition for query branch session
     * @return the filter condition
     */
    private Predicate<? super BranchSession> obtainBranchSessionPredicate(GlobalLockParam param) {
        return branchSession -> {
            // transactionId
            return (isBlank(param.getTransactionId())
                            || String.valueOf(branchSession.getTransactionId()).contains(param.getTransactionId()))
                    &&
                    // branch id
                    (isBlank(param.getBranchId())
                            || String.valueOf(branchSession.getBranchId()).contains(param.getBranchId()));
        };
    }

    /**
     * obtain the global session condition
     *
     * @param param condition for query global session
     * @return the filter condition
     */
    private Predicate<? super GlobalSession> obtainGlobalSessionPredicate(GlobalLockParam param) {

        return globalSession -> {
            // first, there must be withBranchSession
            return CollectionUtils.isNotEmpty(globalSession.getBranchSessions())
                    &&
                    // The second is other conditions
                    // xid
                    (isBlank(param.getXid()) || globalSession.getXid().contains(param.getXid()))
                    &&
                    // timeStart
                    (isNull(param.getTimeStart()) || param.getTimeStart() / 1000 >= globalSession.getBeginTime() / 1000)
                    &&
                    // timeEnd
                    (isNull(param.getTimeEnd()) || param.getTimeEnd() / 1000 <= globalSession.getBeginTime() / 1000);
        };
    }
}
